{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Model Deployment with Spark Serving \n",
    "In this example, we try to predict incomes from the *Adult Census* dataset. Then we will use Spark serving to deploy it as a realtime web service. \n",
    "First, we import needed packages:"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now let's read the data and split it to train and test sets:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "data = spark.read.parquet(\n",
    "    \"wasbs://publicwasb@mmlspark.blob.core.windows.net/AdultCensusIncome.parquet\"\n",
    ")\n",
    "data = data.select([\"education\", \"marital-status\", \"hours-per-week\", \"income\"])\n",
    "train, test = data.randomSplit([0.75, 0.25], seed=123)\n",
    "train.limit(10).toPandas()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "`TrainClassifier` can be used to initialize and fit a model, it wraps SparkML classifiers.\n",
    "You can use `help(synapse.ml.TrainClassifier)` to view the different parameters.\n",
    "\n",
    "Note that it implicitly converts the data into the format expected by the algorithm. More specifically it:\n",
    " tokenizes, hashes strings, one-hot encodes categorical variables, assembles the features into a vector\n",
    "etc.  The parameter `numFeatures` controls the number of hashed features."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "from synapse.ml.train import TrainClassifier\n",
    "from pyspark.ml.classification import LogisticRegression\n",
    "\n",
    "model = TrainClassifier(\n",
    "    model=LogisticRegression(), labelCol=\"income\", numFeatures=256\n",
    ").fit(train)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "After the model is trained, we score it against the test dataset and view metrics."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from synapse.ml.train import ComputeModelStatistics, TrainedClassifierModel\n",
    "\n",
    "prediction = model.transform(test)\n",
    "prediction.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "metrics = ComputeModelStatistics().transform(prediction)\n",
    "metrics.limit(10).toPandas()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "First, we will define the webservice input/output.\n",
    "For more information, you can visit the [documentation for Spark Serving](https://github.com/Microsoft/SynapseML/blob/master/docs/mmlspark-serving.md)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql.types import *\n",
    "from synapse.ml.io import *\n",
    "import uuid\n",
    "\n",
    "serving_inputs = (\n",
    "    spark.readStream.server()\n",
    "    .address(\"localhost\", 8898, \"my_api\")\n",
    "    .option(\"name\", \"my_api\")\n",
    "    .load()\n",
    "    .parseRequest(\"my_api\", test.schema)\n",
    ")\n",
    "\n",
    "serving_outputs = model.transform(serving_inputs).makeReply(\"prediction\")\n",
    "\n",
    "server = (\n",
    "    serving_outputs.writeStream.server()\n",
    "    .replyTo(\"my_api\")\n",
    "    .queryName(\"my_query\")\n",
    "    .option(\"checkpointLocation\", \"file:///tmp/checkpoints-{}\".format(uuid.uuid1()))\n",
    "    .start()\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Test the webservice"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import requests\n",
    "\n",
    "data = '{\"education\":\" 10th\",\"marital-status\":\"Divorced\",\"hours-per-week\":40.0}'\n",
    "r = requests.post(data=data, url=\"http://localhost:8898/my_api\")\n",
    "print(\"Response {}\".format(r.text))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import requests\n",
    "\n",
    "data = '{\"education\":\" Masters\",\"marital-status\":\"Married-civ-spouse\",\"hours-per-week\":40.0}'\n",
    "r = requests.post(data=data, url=\"http://localhost:8898/my_api\")\n",
    "print(\"Response {}\".format(r.text))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "import time\n",
    "\n",
    "time.sleep(20)  # wait for server to finish setting up (just to be safe)\n",
    "server.stop()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "anaconda-cloud": {},
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.3"
  },
  "name": "107 - Model Deployment with Spark Serving"
 },
 "nbformat": 4,
 "nbformat_minor": 2
}